# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
FFT iterface for fast Fourier Transforms using C fftpack fork (using numpy).
:class:`~hysop.numerics.NumpyFFT`
:class:`~hysop.numerics.NumpyFFTPlan`
"""
import numpy as np
from numpy import fft as _FFT
from hysop.tools.htypes import first_not_None
from hysop.numerics.fft.host_fft import HostFFTPlanI, HostFFTI, HostArray
from hysop.numerics.fft.fft import (
complex_to_float_dtype,
float_to_complex_dtype,
mk_view,
mk_shape,
)
[docs]
def dct(a, out=None, type=2, axis=-1):
ndim = a.ndim
shape = a.shape
N = a.shape[axis]
if type == 1:
# O(sqrt(log(N))) error, O(2N) complexity, O(4*N) memory
slc0 = mk_view(ndim, axis, 1, -1)
slc1 = mk_view(ndim, axis, None, None, -1)
s0 = mk_shape(shape, axis, 2 * N - 2)
X = np.empty(shape=s0, dtype=a.dtype)
np.concatenate((a, a[slc0][slc1]), axis=axis, out=X)
res = _FFT.rfft(X, axis=axis).real
if out is None:
out = res
else:
assert out.shape == res.shape
out[...] = res
elif type == 2:
# O(sqrt(log(N))) error, O(N) complexity, O(3N) memory
n0 = N // 2 + 1
n1 = (N - 1) // 2 + 1
slc0 = mk_view(ndim, axis, 1, None, None)
slc1 = mk_view(ndim, axis, None, None, +2)
slc2 = mk_view(ndim, axis, 1, None, +2)
slc3 = mk_view(ndim, axis, None, None, -1)
slc4 = mk_view(ndim, axis, None, None, None, default=None)
slc5 = mk_view(ndim, axis, None, n1, None)
slc6 = mk_view(ndim, axis, n1, None, None)
X = np.empty_like(a)
np.concatenate((a[slc1], a[slc2][slc3]), axis=axis, out=X)
X = _FFT.rfft(X, axis=axis)
X *= (2 * np.exp(-1j * np.pi * np.arange(n0) / (2 * N)))[slc4]
if out is None:
out = np.empty_like(a)
else:
assert out.shape == a.shape
out[slc5] = +X.real[slc5]
out[slc6] = -X.imag[slc0][slc3]
elif type == 3:
# O(sqrt(log(N))) error, O(N) complexity, O(3N) memory
ctype = float_to_complex_dtype(a.dtype)
n0 = N // 2 + 1
n1 = (N - 1) // 2 + 1
slc0 = mk_view(ndim, axis, None, n0, None)
slc1 = mk_view(ndim, axis, 1, None, None)
slc2 = mk_view(ndim, axis, n1, None, None)
slc3 = mk_view(ndim, axis, None, None, -1)
slc4 = mk_view(ndim, axis, None, None, +2)
slc5 = mk_view(ndim, axis, None, n1, None)
slc6 = mk_view(ndim, axis, 1, None, +2)
slc7 = mk_view(ndim, axis, None, None, None, default=None)
s0 = mk_shape(shape, axis, n0)
X = np.zeros(shape=s0, dtype=ctype)
X.real[slc0] = +a[slc0]
X.imag[slc1] = -a[slc2][slc3]
X *= np.exp(+1j * np.pi * np.arange(n0) / (2 * N))[slc7]
X = _FFT.irfft(X, axis=axis, n=N)
X *= N
if out is None:
out = np.empty_like(a)
else:
assert out.shape == a.shape
out[slc4] = X[slc5]
out[slc6] = X[slc2][slc3]
else:
stypes = ["I", "II", "III", "IV", "V", "VI", "VII", "VIII"]
msg = "DCT-{} has not been implemented yet."
msg = msg.format(stypes[type - 1])
raise NotImplementedError(msg)
return out
[docs]
def dst(a, out=None, type=2, axis=-1):
ndim = a.ndim
shape = a.shape
N = a.shape[axis]
if type == 1:
# O(sqrt(log(N))) error, O(2N) complexity, O(4*N) memory
slc0 = mk_view(ndim, axis, None, None, -1)
slc1 = mk_view(ndim, axis, 1, -1, None)
s0 = mk_shape(shape, axis, 2 * N + 2)
s1 = mk_shape(shape, axis, 1)
X = np.empty(shape=s0, dtype=a.dtype)
Z = np.zeros(shape=s1, dtype=a.dtype)
np.concatenate((Z, -a, Z, a[slc0]), axis=axis, out=X)
res = _FFT.rfft(X, axis=axis).imag
if out is None:
out = np.empty_like(a)
else:
assert out.shape == a.shape
out[...] = res[slc1]
elif type == 2:
# O(sqrt(log(N))) error, O(N) complexity, O(3N) memory
n0 = N // 2 + 1
n1 = (N - 1) // 2 + 1
slc0 = mk_view(ndim, axis, 1, None, None)
slc1 = mk_view(ndim, axis, None, None, +2)
slc2 = mk_view(ndim, axis, 1, None, +2)
slc3 = mk_view(ndim, axis, None, None, -1)
slc4 = mk_view(ndim, axis, None, None, None, default=None)
slc5 = mk_view(ndim, axis, None, n1 - 1, None)
slc6 = mk_view(ndim, axis, n1 - 1, None, None)
slc7 = mk_view(ndim, axis, 1, n1, None)
X = np.empty_like(a)
np.concatenate((a[slc1], -a[slc2][slc3]), axis=axis, out=X)
X = _FFT.rfft(X, axis=axis)
X *= (2 * np.exp(-1j * np.pi * np.arange(n0) / (2 * N)))[slc4]
if out is None:
out = np.empty_like(a)
else:
assert out.shape == a.shape
out[slc5] = -X.imag[slc7]
out[slc6] = +X.real[slc3]
elif type == 3:
# O(sqrt(log(N))) error, O(N) complexity, O(3N) memory
ctype = float_to_complex_dtype(a.dtype)
n0 = N // 2 + 1
n1 = (N - 1) // 2 + 1
slc0 = mk_view(ndim, axis, None, n0, None)
slc1 = mk_view(ndim, axis, None, None, -1)
slc2 = mk_view(ndim, axis, 1, None, None)
slc3 = mk_view(ndim, axis, None, N - n1, None)
slc4 = mk_view(ndim, axis, None, None, None, default=None)
slc5 = mk_view(ndim, axis, None, None, 2)
slc6 = mk_view(ndim, axis, None, n1, None)
slc7 = mk_view(ndim, axis, 1, None, 2)
slc8 = mk_view(ndim, axis, n1, None, None)
s0 = mk_shape(shape, axis, n0)
X = np.zeros(shape=s0, dtype=ctype)
X.real[slc0] = +a[slc1][slc0]
X.imag[slc2] = -a[slc3]
X *= np.exp(+1j * np.pi * np.arange(n0) / (2 * N))[slc4]
X = _FFT.irfft(X, axis=axis, n=N)
X[...] *= N
if out is None:
out = np.empty_like(a)
else:
assert out.shape == a.shape
out[slc5] = +X[slc6]
out[slc7] = -X[slc8][slc1]
else:
stypes = ["I", "II", "III", "IV", "V", "VI", "VII", "VIII"]
msg = "DCT-{} has not been implemented yet."
msg = msg.format(stypes[type - 1])
raise NotImplementedError(msg)
return out
[docs]
def idct(a, out=None, type=2, axis=-1, **kwds):
itype = [1, 3, 2, 4][type - 1]
return dct(a=a, out=out, type=itype, axis=axis, **kwds)
[docs]
def idst(a, out=None, type=2, axis=-1, **kwds):
itype = [1, 3, 2, 4][type - 1]
return dst(a=a, out=out, type=itype, axis=axis, **kwds)
[docs]
class NumpyFFTPlan(HostFFTPlanI):
"""
Wrap a numpy fft call (numpy.fft does not offer real planning capabilities).
"""
def __init__(self, fn, a, out, scaling=None, **kwds):
super().__init__()
self.fn = fn
self.a = a
self.out = out
self.scaling = scaling
if isinstance(a, HostArray):
a = a.handle
if isinstance(out, HostArray):
out = out.handle
kwds = kwds.copy()
kwds["a"] = a
if fn in (dct, idct, dst, idst):
kwds["out"] = out
self.kwds = kwds
@property
def input_array(self):
return self.a
@property
def output_array(self):
return self.out
[docs]
def execute(self):
out = self.out
scaling = self.scaling
if self.fn in (dct, idct, dst, idst):
self.fn(**self.kwds)
else:
out[...] = self.fn(**self.kwds)
if scaling is not None:
out[...] *= scaling
[docs]
class NumpyFFT(HostFFTI):
"""
Interface to compute local to process FFT-like transforms using the numpy fft backend.
Numpy fft backend has many disadvantages:
- only double precision is really supported
- single precision is supported by casting to double precision
- creates intermediate temporary buffers at each call
- no planning capabilities (numpy.fft methods are just wrapped into fake plans)
The only advantage is that planning won't destroy original inputs.
"""
def __init__(self, backend=None, allocator=None, warn_on_allocation=True, **kwds):
super().__init__(
backend=backend,
allocator=allocator,
warn_on_allocation=warn_on_allocation,
**kwds,
)
self.supported_ftypes = (
np.float32,
np.float64,
)
self.supported_ctypes = (
np.complex64,
np.complex128,
)
[docs]
def fft(self, a, out=None, axis=-1, **kwds):
(shape, dtype) = super().fft(a=a, out=out, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = NumpyFFTPlan(fn=_FFT.fft, a=a, out=out, axis=axis, **kwds)
return plan
[docs]
def ifft(self, a, out=None, axis=-1, **kwds):
(shape, dtype, s) = super().ifft(a=a, out=out, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = NumpyFFTPlan(fn=_FFT.ifft, a=a, out=out, axis=axis, **kwds)
return plan
[docs]
def rfft(self, a, out=None, axis=-1, **kwds):
(shape, dtype) = super().rfft(a=a, out=out, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = NumpyFFTPlan(fn=_FFT.rfft, a=a, out=out, axis=axis, **kwds)
return plan
[docs]
def irfft(self, a, out=None, n=None, axis=-1, **kwds):
(shape, dtype, s) = super().irfft(a=a, out=out, n=n, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = NumpyFFTPlan(
fn=_FFT.irfft, a=a, out=out, axis=axis, n=shape[axis], **kwds
)
return plan
[docs]
def dct(self, a, out=None, type=2, axis=-1, **kwds):
(shape, dtype) = super().dct(a=a, out=out, type=type, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = NumpyFFTPlan(fn=dct, a=a, out=out, axis=axis, type=type, **kwds)
return plan
[docs]
def idct(self, a, out=None, type=2, axis=-1, scaling=None, **kwds):
(shape, dtype, _, s) = super().idct(a=a, out=out, type=type, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = NumpyFFTPlan(
fn=idct,
a=a,
out=out,
axis=axis,
type=type,
scaling=first_not_None(scaling, 1.0 / s),
**kwds,
)
return plan
[docs]
def dst(self, a, out=None, type=2, axis=-1, **kwds):
(shape, dtype) = super().dst(a=a, out=out, type=type, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = NumpyFFTPlan(fn=dst, a=a, out=out, axis=axis, type=type, **kwds)
return plan
[docs]
def idst(self, a, out=None, type=2, axis=-1, scaling=None, **kwds):
(shape, dtype, _, s) = super().idst(a=a, out=out, type=type, axis=axis, **kwds)
out = self.allocate_output(out, shape, dtype)
plan = NumpyFFTPlan(
fn=idst,
a=a,
out=out,
axis=axis,
type=type,
scaling=first_not_None(scaling, 1.0 / s),
**kwds,
)
return plan